package com.shujia.stream

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object JIchaControl {
  def main(args: Array[String]): Unit = {
    //环境设置对象
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    /**
     * flink sql环境
     *
     */
    val table: TableEnvironment = TableEnvironment.create(settings)


    /**
     * 创建  卡口过车source 表
     *
     */

    table.executeSql(
      """
        |CREATE TABLE cars (
        |    car STRING,
        |    city_code STRING,
        |    county_code STRING,
        |    card BIGINT,
        |    camera_id STRING,
        |    orientation STRING,
        |    road_id BIGINT,
        |    `time` BIGINT,
        |    speed DOUBLE,
        |    proc_time as PROCTIME()
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'car',
        |  'properties.bootstrap.servers' = 'master:9092',
        |  'properties.group.id' = 'testGroup',
        |  'scan.startup.mode' = 'earliest-offset',
        |  'format' = 'json'
        |)
        |""".stripMargin)

    /**
     * 创建布控列表维表
     *
     */

    table.executeSql(
      """
        |
        |
        |
        |CREATE TABLE control_list (
        |    `car` STRING,
        |    PRIMARY KEY (car) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://master:3306/car?useUnicode=true&characterEncoding=UTF-8',
        |   'table-name' = 'control_list',
        |    'username' ='root',
        |    'password'='123456'
        |)
        |
        |
        |""".stripMargin)

    /**
     * 创建保存布控结果sink表
     *
     */
    table.executeSql(
      """
        |
        |
        |CREATE TABLE control_cars (
        |    car STRING,
        |    city_code STRING,
        |    county_code STRING,
        |    card BIGINT,
        |    camera_id STRING,
        |    orientation STRING,
        |    road_id BIGINT,
        |    `time` BIGINT,
        |    speed DOUBLE,
        |    PRIMARY KEY (car,`time`) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://master:3306/car?useUnicode=true&characterEncoding=UTF-8',
        |   'table-name' = 'control_cars',
        |    'username' ='root',
        |    'password'='123456'
        |)
        |
        |""".stripMargin)

    /**
     * 缉查布控
     *
     */

    table.executeSql(
      """
        |
        |insert into control_cars
        |select
        |    a.car,
        |    a.city_code,
        |    a.county_code,
        |    a.card,
        |    a.camera_id,
        |    a.orientation,
        |    a.road_id,
        |    a.`time`,
        |    a.speed
        |from
        |    cars as a
        |    inner join
        |    control_list FOR SYSTEM_TIME AS OF a.proc_time  as b
        |    on a.car=b.car
        |
        |""".stripMargin)

  }

}
